/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*****************************************************************************
 * $Id: dbrm.cpp 1878 2013-05-02 15:17:12Z dcathey $
 *
 ****************************************************************************/

#include <iostream>
#include <sys/types.h>
#include <vector>
#ifdef __linux__
#include <values.h>
#endif
#include <boost/thread.hpp>
//#define NDEBUG
#include <cassert>

#include "oamcache.h"
#include "rwlock.h"
#include "mastersegmenttable.h"
#include "extentmap.h"
#include "copylocks.h"
#include "vss.h"
#include "vbbm.h"
#include "socketclosed.h"
#include "configcpp.h"
#include "sessionmanagerserver.h"
#include "messagequeuepool.h"
#define DBRM_DLLEXPORT
#include "dbrm.h"
#undef DBRM_DLLEXPORT

#ifdef BRM_DEBUG
#define CHECK_EMPTY(x) \
	if (x.length() != 0) \
		throw logic_error("DBRM: got a message of the wrong size");
#else
#define CHECK_EMPTY(x)
#endif

#define DO_ERR_NETWORK \
	MessageQueueClientPool::releaseInstance(msgClient); \
	msgClient = NULL; \
	mutex.unlock(); \
	return ERR_NETWORK;

using namespace std;
using namespace messageqcpp;
using namespace oam;

#ifdef BRM_INFO
#include "tracer.h"
#endif

namespace BRM
{

DBRM::DBRM(bool noBRMinit) throw() : fDebug(false)
{
    if (!noBRMinit)
    {
        mst.reset(new MasterSegmentTable());
        em.reset(new ExtentMap());
        vss.reset(new VSS());
        vbbm.reset(new VBBM());
        copylocks.reset(new CopyLocks());

        em->setReadOnly();
        vss->setReadOnly();
        vbbm->setReadOnly();
    }

    msgClient = NULL;
    masterName = "DBRM_Controller";
    config = config::Config::makeConfig();
#ifdef BRM_INFO
    fDebug = ("Y" == config->getConfig("DBRM", "Debug"));
#endif
}

DBRM::DBRM(const DBRM& brm)
{
    throw logic_error("DBRM: Don't use the copy constructor.");
}

DBRM::~DBRM() throw()
{
    if (msgClient != NULL)
        MessageQueueClientPool::releaseInstance(msgClient);
}

DBRM& DBRM::operator=(const DBRM& brm)
{
    throw logic_error("DBRM: Don't use the = operator.");
}


int DBRM::saveState() throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("saveState()");

#endif
    string prefix = config->getConfig("SystemConfig", "DBRMRoot");

    if (prefix.length() == 0)
    {
        cerr << "Error: Need a valid Calpont configuation file" << endl;
        exit(1);
    }

    int rc = saveState(prefix);

    return rc;
}

int DBRM::saveState(string filename) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("saveState(filename)");
        TRACER_ADDSTRINPUT(filename);
        TRACER_WRITE;
    }

#endif
    string emFilename = filename + "_em";
    string vssFilename = filename + "_vss";
    string vbbmFilename = filename + "_vbbm";
    bool locked[3] = { false, false, false };

    try
    {
        vbbm->lock(VBBM::READ);
        locked[0] = true;
        vss->lock(VSS::READ);
        locked[1] = true;
        copylocks->lock(CopyLocks::READ);
        locked[2] = true;

        saveExtentMap(emFilename);
        vbbm->save(vbbmFilename);
        vss->save(vssFilename);

        copylocks->release(CopyLocks::READ);
        locked[2] = false;
        vss->release(VSS::READ);
        locked[1] = false;
        vbbm->release(VBBM::READ);
        locked[0] = false;
    }
    catch (exception& e)
    {
        if (locked[2])
            copylocks->release(CopyLocks::READ);

        if (locked[1])
            vss->release(VSS::READ);

        if (locked[0])
            vbbm->release(VBBM::READ);

        return -1;
    }

    return 0;
}

int DBRM::saveExtentMap(const string& filename) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("saveExtentMap");
        TRACER_ADDSTRINPUT(filename);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->save(filename);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

// @bug 1055+.  New functions added for multiple files per OID enhancement.
int DBRM::lookupLocal(LBID_t lbid, VER_t verid, bool vbFlag, OID_t& oid,
                      uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum, uint32_t& fileBlockOffset) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("lookupLocal(lbid,ver,..)");
        TRACER_ADDINPUT(lbid);
        TRACER_ADDINPUT(verid);
        TRACER_ADDBOOLINPUT(vbFlag);
        TRACER_ADDOUTPUT(oid);
        TRACER_ADDSHORTOUTPUT(dbRoot);
        TRACER_ADDOUTPUT(partitionNum);
        TRACER_ADDSHORTOUTPUT(segmentNum);
        TRACER_ADDOUTPUT(fileBlockOffset);
        TRACER_WRITE;
    }

#endif
    bool locked[2] = {false, false};
    int ret;
    bool tooOld = false;

    try
    {
        if (!vbFlag)
            return em->lookupLocal(lbid, (int&)oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
        else
        {
            vbbm->lock(VBBM::READ);
            locked[0] = true;
            ret = vbbm->lookup(lbid, verid, oid, fileBlockOffset);
            vbbm->release(VBBM::READ);
            locked[0] = false;

            if (ret < 0)
            {
                vss->lock(VSS::READ);
                locked[1] = true;
                tooOld = vss->isTooOld(lbid, verid);
                vss->release(VSS::READ);
                locked[1] = false;

                if (tooOld)
                    return ERR_SNAPSHOT_TOO_OLD;
            }

            return ret;
        }
    }
    catch (exception& e)
    {
        if (locked[1])
            vss->release(VSS::READ);

        if (locked[0])
            vbbm->release(VBBM::READ);

        cerr << e.what() << endl;
        return -1;
    }
}

int DBRM::lookupLocal(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset, LBID_t& lbid) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINPUT(fileBlockOffset);
        TRACER_ADDOUTPUT(lbid);
        TRACER_WRITE;
    }

#endif

    try
    {
        return em->lookupLocal(oid, partitionNum, segmentNum, fileBlockOffset, lbid);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }
}

int DBRM::lookupLocal_DBroot(OID_t oid, uint32_t dbroot, uint32_t partitionNum, uint16_t segmentNum,
                             uint32_t fileBlockOffset, LBID_t& lbid) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINPUT(fileBlockOffset);
        TRACER_ADDOUTPUT(lbid);
        TRACER_WRITE;
    }

#endif

    try
    {
        return em->lookupLocal_DBroot(oid, dbroot, partitionNum, segmentNum, fileBlockOffset, lbid);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }
}

// @bug 1055-

//------------------------------------------------------------------------------
// Lookup/return starting LBID for the specified OID, partition, segment, and
// file block offset.
//------------------------------------------------------------------------------
int DBRM::lookupLocalStartLbid(OID_t    oid,
                               uint32_t partitionNum,
                               uint16_t segmentNum,
                               uint32_t fileBlockOffset,
                               LBID_t&  lbid) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("lookupLocalStartLbid(oid,fbo,..)");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINPUT(fileBlockOffset);
        TRACER_ADDOUTPUT(lbid);
        TRACER_WRITE;
    }

#endif

    try
    {
        return em->lookupLocalStartLbid(oid, partitionNum, segmentNum,
                                        fileBlockOffset, lbid);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }
}

int DBRM::lookup(OID_t oid, LBIDRange_v& lbidList) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("lookup(oid,range)");
        TRACER_ADDINPUT(oid);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->lookup(oid, lbidList);
        return 0;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }
}

// Casual Partitioning support
int DBRM::markExtentInvalid(const LBID_t lbid,
                            execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("markExtentInvalid");
        TRACER_ADDINPUT(lbid);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << MARKEXTENTINVALID << (uint64_t)lbid << (uint32_t)colDataType;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::markExtentsInvalid(const vector<LBID_t>& lbids,
                             const std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("markExtentsInvalid");

#endif
    ByteStream command, response;
    uint8_t err;
    uint32_t size = lbids.size(), i;

    command << MARKMANYEXTENTSINVALID << size;

    for (i = 0; i < size; i++)
    {
        command << (uint64_t) lbids[i];
        command << (uint32_t) colDataTypes[i];
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::getExtentMaxMin(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getExtentMaxMin");
        TRACER_ADDINPUT(lbid);
        TRACER_ADDOUTPUT(max);
        TRACER_ADDOUTPUT(min);
        TRACER_ADDOUTPUT(seqNum);
        TRACER_WRITE;
    }

#endif

    try
    {
        int ret = em->getMaxMin(lbid, max, min, seqNum);
        return ret;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return false;
    }
}

int DBRM::setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min, const int32_t seqNum) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("setExtentMaxMin");
        TRACER_ADDINPUT(lbid);
        TRACER_ADDINPUT(max);
        TRACER_ADDINPUT(min);
        TRACER_ADDINPUT(seqNum);
        TRACER_WRITE;
    }

#endif
    ByteStream command, response;
    uint8_t err;

    command << SETEXTENTMAXMIN << (uint64_t)lbid << (uint64_t)max << (uint64_t)min << (uint64_t)seqNum;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;

}

// @bug 1970 - Added function below to set multiple extents casual partition info in one call.
int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW
{
    CPInfoList_t::const_iterator it;
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("setExtentsMaxMin");

        for (it = cpInfos.begin(); it != cpInfos.end(); it++)
        {
            TRACER_ADDINPUT(it->firstLbid);
            TRACER_ADDINPUT(it->max);
            TRACER_ADDINPUT(it->min);
            TRACER_ADDINPUT(it->seqNum);
            TRACER_WRITE;
        }
    }

#endif
    ByteStream command, response;
    uint8_t err = 0;

    if (cpInfos.size() == 0)
        return err;

    if (cpInfos.empty())
        return ERR_OK;

    command << SETMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();

    for (it = cpInfos.begin(); it != cpInfos.end(); it++)
    {
        command << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum;
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// @bug 2117 - Add function to merge Casual Partition info with current extent
// map information.
//------------------------------------------------------------------------------
int DBRM::mergeExtentsMaxMin(const CPInfoMergeList_t& cpInfos) DBRM_THROW
{
    CPInfoMergeList_t::const_iterator it;
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("updateExtentsMaxMin");

        for (it = cpInfos.begin(); it != cpInfos.end(); it++)
        {
            TRACER_ADDINPUT(it->startLbid);
            TRACER_ADDINPUT(it->max);
            TRACER_ADDINPUT(it->min);
            TRACER_ADDINPUT(it->seqNum);
            TRACER_ADDINPUT(it->type);
            TRACER_ADDINPUT(it->newExtent);
            TRACER_WRITE;
        }
    }

#endif
    ByteStream command, response;
    uint8_t err;

    command << MERGEMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();

    for (it = cpInfos.begin(); it != cpInfos.end(); it++)
    {
        command << (uint64_t)it->startLbid <<
                (uint64_t)it->max       <<
                (uint64_t)it->min       <<
                (uint32_t)it->seqNum    <<
                (uint32_t)it->type      <<
                (uint32_t)it->newExtent;
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::vssLookup(LBID_t lbid, const QueryContext& verInfo, VER_t txnID, VER_t* outVer,
                    bool* vbFlag, bool vbOnly) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("vssLookup");
        TRACER_ADDINPUT(lbid);
        TRACER_ADDINPUT(verInfo);
        TRACER_ADDINPUT(txnID);
        TRACER_ADDBOOLINPUT(vbOnly);
        TRACER_WRITE;
    }

#endif

    if (!vbOnly && vss->isEmpty())
    {
        *outVer = 0;
        *vbFlag = false;
        return -1;
    }

    bool locked = false;

    try
    {
        int rc = 0;
        vss->lock(VSS::READ);
        locked = true;
        rc = vss->lookup(lbid, verInfo, txnID, outVer, vbFlag, vbOnly);
        vss->release(VSS::READ);
        return rc;
    }
    catch (exception& e)
    {
        if (locked)
            vss->release(VSS::READ);

        cerr << e.what() << endl;
        return -1;
    }
}

int DBRM::bulkVSSLookup(const std::vector<LBID_t>& lbids, const QueryContext_vss& verInfo,
                        VER_t txnID, std::vector<VSSData>* out)
{
    uint32_t i;
    bool locked = false;

    try
    {
        out->resize(lbids.size());
        vss->lock(VSS::READ);
        locked = true;

        if (vss->isEmpty(false))
        {
            for (i = 0; i < lbids.size(); i++)
            {
                VSSData& vd = (*out)[i];
                vd.verID = 0;
                vd.vbFlag = false;
                vd.returnCode = -1;
            }
        }
        else
        {
            for (i = 0; i < lbids.size(); i++)
            {
                VSSData& vd = (*out)[i];
                vd.returnCode = vss->lookup(lbids[i], verInfo, txnID, &vd.verID, &vd.vbFlag, false);
            }
        }

        vss->release(VSS::READ);
        return 0;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
    }
    catch (...)
    {
        cerr << "bulkVSSLookup: caught an exception" << endl;
    }

    if (locked)
        vss->release(VSS::READ);

    out->clear();
    return -1;
}

VER_t DBRM::getCurrentVersion(LBID_t lbid, bool* isLocked) const
{
    bool locked = false;
    VER_t ret = 0;

    try
    {
        vss->lock(VSS::READ);
        locked = true;
        ret = vss->getCurrentVersion(lbid, isLocked);
        vss->release(VSS::READ);
        locked = false;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;

        if (locked)
            vss->release(VSS::READ);

        throw;
    }

    return ret;
}

int DBRM::bulkGetCurrentVersion(const vector<LBID_t>& lbids, vector<VER_t>* versions,
                                vector<bool>* isLocked) const
{
    bool locked = false;

    versions->resize(lbids.size());

    if (isLocked != NULL)
        isLocked->resize(lbids.size());

    try
    {
        vss->lock(VSS::READ);
        locked = true;

        if (isLocked != NULL)
        {
            bool tmp = false;

            for (uint32_t i = 0; i < lbids.size(); i++)
            {
                (*versions)[i] = vss->getCurrentVersion(lbids[i], &tmp);
                (*isLocked)[i] = tmp;
            }
        }
        else
            for (uint32_t i = 0; i < lbids.size(); i++)
                (*versions)[i] = vss->getCurrentVersion(lbids[i], NULL);

        vss->release(VSS::READ);
        locked = false;
        return 0;
    }
    catch (exception& e)
    {
        versions->clear();
        cerr << e.what() << endl;

        if (locked)
            vss->release(VSS::READ);

        return -1;
    }
}


VER_t DBRM::getHighestVerInVB(LBID_t lbid, VER_t max) const
{
    bool locked = false;
    VER_t ret = -1;

    try
    {
        vss->lock(VSS::READ);
        locked = true;
        ret = vss->getHighestVerInVB(lbid, max);
        vss->release(VSS::READ);
        locked = false;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;

        if (locked)
            vss->release(VSS::READ);

        throw;
    }

    return ret;
}

bool DBRM::isVersioned(LBID_t lbid, VER_t ver) const
{
    bool ret = false;
    bool locked = false;

    try
    {
        vss->lock(VSS::READ);
        locked = true;
        ret = vss->isVersioned(lbid, ver);
        vss->release(VSS::READ);
        locked = false;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;

        if (locked)
            vss->release(VSS::READ);

        throw;
    }

    return ret;
}

int8_t DBRM::send_recv(const ByteStream& in, ByteStream& out) throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("send_recv");

#endif
    bool firstAttempt = true;

    mutex.lock();

reconnect:

    if (msgClient == NULL)
        try
        {
            msgClient = MessageQueueClientPool::getInstance(masterName);
        }
        catch (exception& e)
        {
            cerr << "class DBRM failed to create a MessageQueueClient: " <<
                 e.what() << endl;
            msgClient = NULL;
            mutex.unlock();
            return ERR_NETWORK;
        }

    try
    {
        msgClient->write(in);
        out = msgClient->read();
    }
    /* If we add a timeout to the read() call, uncomment this clause
    catch (SocketClosed &e) {
    	cerr << "DBRM::send_recv: controller node closed the connection" << endl;
    	DO_ERR_NETWORK;
    }
    */
    catch (exception& e)
    {
        cerr << "DBRM::send_recv caught: " << e.what() << endl;

        if (firstAttempt)
        {
            firstAttempt = false;
            MessageQueueClientPool::releaseInstance(msgClient);
            msgClient = NULL;
            goto reconnect;
        }

        DO_ERR_NETWORK;
    }

    if (out.length() == 0)
    {
        cerr << "DBRM::send_recv: controller node closed the connection" << endl;

        if (firstAttempt)
        {
            firstAttempt = false;
            MessageQueueClientPool::releaseInstance(msgClient);
            msgClient = NULL;
            sleep(10);
            goto reconnect;
        }

        DO_ERR_NETWORK;
    }

    mutex.unlock();
    return ERR_OK;
}

//------------------------------------------------------------------------------
// Send a request to create a "stripe" of column extents for the specified
// column OIDs and DBRoot.
//------------------------------------------------------------------------------
int DBRM::createStripeColumnExtents(
    const std::vector<CreateStripeColumnExtentsArgIn>& cols,
    uint16_t  dbRoot,
    uint32_t& partitionNum,
    uint16_t& segmentNum,
    std::vector<CreateStripeColumnExtentsArgOut>& extents) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("createStripeColumnExtents");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t  err;
    uint16_t tmp16;
    uint32_t tmp32;

    command << CREATE_STRIPE_COLUMN_EXTENTS;
    serializeInlineVector(command, cols);
    command << dbRoot << partitionNum;

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    try
    {
        response >> err;

        if (err != 0)
            return (int) err;

        response >> tmp32;
        partitionNum = tmp32;
        response >> tmp16;
        segmentNum = tmp16;
        deserializeInlineVector(response, extents);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    CHECK_EMPTY(response);
    return 0;
}

//------------------------------------------------------------------------------
// Send a request to create a column extent for the specified OID and DBRoot.
//------------------------------------------------------------------------------
int DBRM::createColumnExtent_DBroot(OID_t oid,
                                    uint32_t  colWidth,
                                    uint16_t  dbRoot,
                                    uint32_t& partitionNum,
                                    uint16_t& segmentNum,
                                    execplan::CalpontSystemCatalog::ColDataType colDataType,
                                    LBID_t&    lbid,
                                    int&       allocdSize,
                                    uint32_t& startBlockOffset) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("createColumnExtent_DBroot");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(colWidth);
        TRACER_ADDSHORTINPUT(dbRoot);
        TRACER_ADDOUTPUT(partitionNum);
        TRACER_ADDSHORTOUTPUT(segmentNum);
        TRACER_ADDINT64OUTPUT(lbid);
        TRACER_ADDOUTPUT(allocdSize);
        TRACER_ADDOUTPUT(startBlockOffset);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t  err;
    uint32_t tmp8 = (uint8_t)colDataType;
    uint16_t tmp16;
    uint32_t tmp32;
    uint64_t tmp64;

    command << CREATE_COLUMN_EXTENT_DBROOT << (ByteStream::quadbyte) oid <<
            colWidth << dbRoot << partitionNum << segmentNum << tmp8;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    try
    {
        response >> err;

        if (err != 0)
            return (int) err;

        response >> tmp32;
        partitionNum = tmp32;
        response >> tmp16;
        segmentNum = tmp16;
        response >> tmp64;
        lbid = (int64_t)tmp64;
        response >> tmp32;
        allocdSize = (int32_t)tmp32;
        response >> tmp32;
        startBlockOffset = (int32_t)tmp32;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    CHECK_EMPTY(response);
    return 0;
}

//------------------------------------------------------------------------------
// Send a request to create a column extent for the exact segment file
// specified by the requested OID, DBRoot, partition, and segment.
//------------------------------------------------------------------------------
int DBRM::createColumnExtentExactFile(OID_t oid,
                                      uint32_t  colWidth,
                                      uint16_t  dbRoot,
                                      uint32_t partitionNum,
                                      uint16_t segmentNum,
                                      execplan::CalpontSystemCatalog::ColDataType colDataType,
                                      LBID_t&    lbid,
                                      int&       allocdSize,
                                      uint32_t& startBlockOffset) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("createColumnExtentExactFile");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(colWidth);
        TRACER_ADDSHORTINPUT(dbRoot);
        TRACER_ADDOUTPUT(partitionNum);
        TRACER_ADDSHORTOUTPUT(segmentNum);
        TRACER_ADDINT64OUTPUT(lbid);
        TRACER_ADDOUTPUT(allocdSize);
        TRACER_ADDOUTPUT(startBlockOffset);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t  err;
    uint8_t  tmp8;
    uint16_t tmp16;
    uint32_t tmp32;
    uint64_t tmp64;

    tmp8 = (uint8_t)colDataType;
    command << CREATE_COLUMN_EXTENT_EXACT_FILE << (ByteStream::quadbyte) oid <<
            colWidth << dbRoot << partitionNum << segmentNum << tmp8;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    try
    {
        response >> err;

        if (err != 0)
            return (int) err;

        response >> tmp32;
        partitionNum = tmp32;
        response >> tmp16;
        segmentNum = tmp16;
        response >> tmp64;
        lbid = (int64_t)tmp64;
        response >> tmp32;
        allocdSize = (int32_t)tmp32;
        response >> tmp32;
        startBlockOffset = (int32_t)tmp32;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    CHECK_EMPTY(response);
    return 0;
}

//------------------------------------------------------------------------------
// Send a request to create a dictionary store extent.
//------------------------------------------------------------------------------
int DBRM::createDictStoreExtent(OID_t oid,
                                uint16_t  dbRoot,
                                uint32_t  partitionNum,
                                uint16_t  segmentNum,
                                LBID_t&    lbid,
                                int&       allocdSize) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("createDictStoreExtent");
        TRACER_ADDINPUT(oid);
        TRACER_ADDSHORTINPUT(dbRoot);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINT64OUTPUT(lbid);
        TRACER_ADDOUTPUT(allocdSize);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t  err;
    uint32_t tmp32;
    uint64_t tmp64;

    command << CREATE_DICT_STORE_EXTENT << (ByteStream::quadbyte) oid << dbRoot <<
            partitionNum << segmentNum;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    try
    {
        response >> err;

        if (err != 0)
            return (int) err;

        response >> tmp64;
        lbid = (int64_t)tmp64;
        response >> tmp32;
        allocdSize = (int32_t)tmp32;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    CHECK_EMPTY(response);
    return 0;
}

//------------------------------------------------------------------------------
// Send a request to delete a set extents for the specified column OID and
// DBRoot, and to return the extents to the free list.  HWMs for the last
// stripe of extents in the specified DBRoot are updated accordingly.
//------------------------------------------------------------------------------
int DBRM::rollbackColumnExtents_DBroot(OID_t oid,
                                       bool       bDeleteAll,
                                       uint16_t  dbRoot,
                                       uint32_t  partitionNum,
                                       uint16_t  segmentNum,
                                       HWM_t      hwm) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("rollbackColumnExtents");
        TRACER_ADDINPUT(oid);
        TRACER_ADDBOOLINPUT(bDeleteAll);
        TRACER_ADDSHORTINPUT(dbRoot);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINPUT(hwm);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << ROLLBACK_COLUMN_EXTENTS_DBROOT << (ByteStream::quadbyte) oid <<
            (uint8_t)bDeleteAll << dbRoot << partitionNum <<
            segmentNum << hwm;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Send a request to delete a set of extents for the specified dictionary store
// OID and DBRoot, and to return the extents to the free list.  HWMs for the
// last stripe of extents are updated accordingly.
//------------------------------------------------------------------------------
int DBRM::rollbackDictStoreExtents_DBroot(OID_t oid,
        uint16_t            dbRoot,
        uint32_t            partitionNum,
        const vector<uint16_t>& segNums,
        const vector<HWM_t>& hwms) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("rollbackDictStoreExtents");
        TRACER_ADDINPUT(oid);
        TRACER_ADDSHORTINPUT(dbRoot);
        TRACER_ADDINPUT(partitionNum);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << ROLLBACK_DICT_STORE_EXTENTS_DBROOT <<
            (ByteStream::quadbyte) oid <<
            dbRoot << partitionNum;
    serializeVector(command, segNums);
    serializeVector(command, hwms);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::deleteEmptyColExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("deleteEmptyColExtents");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;
    uint32_t size = extentsInfo.size();
    command << DELETE_EMPTY_COL_EXTENTS;
    command << size;

    for ( unsigned i = 0; i < extentsInfo.size(); i++)
    {
        command << (ByteStream::quadbyte) extentsInfo[i].oid;
        command << extentsInfo[i].partitionNum;
        command << extentsInfo[i].segmentNum;
        command << extentsInfo[i].dbRoot;
        command << extentsInfo[i].hwm;
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::deleteEmptyDictStoreExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("deleteEmptyDictStoreExtents");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;
    uint32_t size = extentsInfo.size();
    command << DELETE_EMPTY_DICT_STORE_EXTENTS;
    command << size;

    for ( unsigned i = 0; i < extentsInfo.size(); i++)
    {
        command << (ByteStream::quadbyte) extentsInfo[i].oid;
        command << extentsInfo[i].partitionNum;
        command << extentsInfo[i].segmentNum;
        command << extentsInfo[i].dbRoot;
        command << extentsInfo[i].hwm;
        command << (uint8_t)extentsInfo[i].newFile;
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::deleteOID(OID_t oid) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("deleteOID");
        TRACER_ADDINPUT(oid);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << DELETE_OID << (ByteStream::quadbyte) oid;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);

    try
    {
        deleteAISequence(oid);
    }
    catch (...) { }   // an error here means a network problem, will be caught elsewhere

    return err;
}

int DBRM::deleteOIDs(const std::vector<OID_t>& oids) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("deleteOIDs");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;
    uint32_t size = oids.size();
    command << DELETE_OIDS;
    command << size;

    for ( unsigned i = 0; i < oids.size(); i++)
    {
        command << (ByteStream::quadbyte) oids[i];
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);

    try
    {
        for (uint32_t i = 0; i < oids.size(); i++)
            deleteAISequence(oids[i]);
    }
    catch (...) { }   // an error here means a network problem, will be caught elsewhere

    return err;
}

//------------------------------------------------------------------------------
// Return the last local HWM for the specified OID and DBroot. The corresponding
// partition number, and segment number are returned as well.  This function
// can be used by cpimport for example to find out where the current "end-of-
// data" is, so that cpimport will know where to begin adding new rows.
// If no available or outOfService extent is found, then bFound is returned
// as false.
//------------------------------------------------------------------------------
int DBRM::getLastHWM_DBroot(int oid, uint16_t dbRoot, uint32_t& partitionNum,
                            uint16_t& segmentNum, HWM_t& hwm,
                            int& status, bool& bFound) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getLastHWM_DBroot");
        TRACER_ADDINPUT(oid);
        TRACER_ADDSHORTOUTPUT(dbRoot);
        TRACER_ADDOUTPUT(partitionNum);
        TRACER_ADDSHORTOUTPUT(segmentNum);
        TRACER_ADDOUTPUT(hwm);
        TRACER_ADDOUTPUT(status);
        TRACER_WRITE;
    }

#endif

    try
    {
        hwm = em->getLastHWM_DBroot(oid, dbRoot, partitionNum, segmentNum,
                                    status, bFound);
    }
    catch (exception& e)
    {
        return ERR_FAILURE;
    }

    return ERR_OK;
}

//------------------------------------------------------------------------------
// Return the HWM for the specified OID, partition number, and segment number.
// This is used to get the HWM for a particular dictionary segment store file,
// or a specific column segment file.
//------------------------------------------------------------------------------
int DBRM::getLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
                      HWM_t& hwm, int& status) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getLocalHWM");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDOUTPUT(hwm);
        TRACER_ADDOUTPUT(status);
        TRACER_WRITE;
    }

#endif

    try
    {
        hwm = em->getLocalHWM(oid, partitionNum, segmentNum, status);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    return ERR_OK;
}

//------------------------------------------------------------------------------
// Set the local HWM for the file referenced by the specified OID, partition
// number, and segment number.
//------------------------------------------------------------------------------
int DBRM::setLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
                      HWM_t hwm) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("setLocalHWM");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDINPUT(hwm);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << SET_LOCAL_HWM << (ByteStream::quadbyte) oid << partitionNum << segmentNum << hwm;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::bulkSetHWM(const vector<BulkSetHWMArg>& v, VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("bulkSetHWM");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << BULK_SET_HWM;
    serializeInlineVector(command, v);
    command << (uint32_t) transID;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& v, const vector<CPInfo>& setCPDataArgs,
                          const vector<CPInfoMerge>& mergeCPDataArgs, VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("bulkSetHWMAndCP");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << BULK_SET_HWM_AND_CP;
    serializeInlineVector(command, v);
    serializeInlineVector(command, setCPDataArgs);
    serializeInlineVector(command, mergeCPDataArgs);
    command << (uint32_t) transID;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg>& args)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("bulkUpdateDBRoot");
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << BULK_UPDATE_DBROOT;
    serializeInlineVector(command, args);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}


//------------------------------------------------------------------------------
// For the specified OID and PM number, this function will return a vector
// of objects carrying HWM info (for the last segment file) and block count
// information about each DBRoot assigned to the specified PM.
//------------------------------------------------------------------------------
int DBRM::getDbRootHWMInfo(OID_t oid, uint16_t pmNumber,
                           EmDbRootHWMInfo_v& emDBRootHwmInfos) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getDbRootHWMInfo");
        TRACER_ADDINPUT(oid);
        TRACER_ADDSHORTINPUT(pmNumber);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getDbRootHWMInfo(oid, pmNumber, emDBRootHwmInfos);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    return ERR_OK;
}

//------------------------------------------------------------------------------
// Return the status or state of the extents in the segment file specified
// by the arguments: oid, partitionNum, and segment Num.
//------------------------------------------------------------------------------
int DBRM::getExtentState(OID_t oid, uint32_t partitionNum,
                         uint16_t segmentNum, bool& bFound, int& status) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getExtentState");
        TRACER_ADDINPUT(oid);
        TRACER_ADDINPUT(partitionNum);
        TRACER_ADDSHORTINPUT(segmentNum);
        TRACER_ADDOUTPUT(status);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getExtentState(oid, partitionNum,
                           segmentNum, bFound, status);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_FAILURE;
    }

    return ERR_OK;
}

// dmc-should eventually deprecate
int DBRM::getExtentSize() throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("getExtentSize");

#endif
    return em->getExtentSize();
}

unsigned DBRM::getExtentRows() throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("getExtentRows");

#endif
    return em->getExtentRows();
}

int DBRM::getExtents(int OID, std::vector<struct EMEntry>& entries,
                     bool sorted, bool notFoundErr, bool incOutOfService) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getExtents");
        TRACER_ADDINPUT(OID);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getExtents(OID, entries, sorted, notFoundErr, incOutOfService);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

int DBRM::getExtents_dbroot(int OID, std::vector<struct EMEntry>& entries,
                            const uint16_t dbroot) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getExtents_dbroot");
        TRACER_ADDINPUT(OID);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getExtents_dbroot(OID, entries, dbroot);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

//------------------------------------------------------------------------------
// Return the number of extents for the specified OID and DBRoot.
// Any out-of-service extents can optionally be included or excluded.
//------------------------------------------------------------------------------
int DBRM::getExtentCount_dbroot(int OID, uint16_t dbroot,
                                bool incOutOfService, uint64_t& numExtents) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getExtentCount_dbroot");
        TRACER_ADDINPUT(OID);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getExtentCount_dbroot(OID, dbroot, incOutOfService, numExtents);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

//------------------------------------------------------------------------------
// Gets the DBRoot for the specified system catalog OID.
// Function assumes the specified System Catalog OID is fully contained on
// a single DBRoot, as the function only searches for and returns the first
// DBRoot entry that is found in the extent map.
//------------------------------------------------------------------------------
int DBRM::getSysCatDBRoot(OID_t oid, uint16_t& dbRoot) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getSysCatDBRoot");
        TRACER_ADDINPUT(oid);
        TRACER_ADDSHORTOUTPUT(dbRoot);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getSysCatDBRoot(oid, dbRoot);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

//------------------------------------------------------------------------------
// Delete all extents for the specified OID(s) and partition number.
//------------------------------------------------------------------------------
int DBRM::deletePartition(const std::vector<OID_t>& oids,
                          const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITENOW("deletePartition");
        std::ostringstream oss;
        oss << "partitionNum: "
            std::set<LogicalPartition>::const_iterator partIt;

        for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
            oss << (*it) << " "
                oss << "; OIDS: ";

        std::vector<OID_t>::const_iterator it;

        for (it = oids.begin(); it != oids.end(); ++it)
        {
            oss << (*it) << ", ";
        }

        TRACER_WRITEDIRECT(oss.str());
    }

#endif

    ByteStream command, response;
    uint8_t err;
    command << DELETE_PARTITION;
    serializeSet<LogicalPartition>(command, partitionNums);
    uint32_t oidSize = oids.size();
    command << oidSize;

    for ( unsigned i = 0; i < oidSize; i++)
        command << (ByteStream::quadbyte) oids[i];

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;

    if (err != 0)
        response >> emsg;

    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Mark all extents as out of service, for the specified OID(s) and partition
// number.
//------------------------------------------------------------------------------
int DBRM::markPartitionForDeletion(const std::vector<OID_t>& oids,
                                   const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITENOW("markPartitionForDeletion");
        std::ostringstream oss;
        oss << "partitionNum: "
            std::set<LogicalPartition>::const_iterator partIt;

        for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
            oss << (*it) << " "
                oss << "; OIDS: ";

        std::vector<OID_t>::const_iterator it;

        for (it = oids.begin(); it != oids.end(); ++it)
        {
            oss << (*it) << ", ";
        }

        TRACER_WRITEDIRECT(oss.str());
    }

#endif

    ByteStream command, response;
    uint8_t err;
    command << MARK_PARTITION_FOR_DELETION;
    serializeSet<LogicalPartition>(command, partitionNums);
    uint32_t oidSize = oids.size();
    command << oidSize;

    for ( unsigned i = 0; i < oidSize; i++)
        command << (ByteStream::quadbyte) oids[i];

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;

    if (err)
        response >> emsg;

    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Mark all extents as out of service, for the specified OID(s)
//------------------------------------------------------------------------------
int DBRM::markAllPartitionForDeletion(const std::vector<OID_t>& oids) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITENOW("markAllPartitionForDeletion");
        std::ostringstream oss;
        oss << "OIDS: ";
        std::vector<OID_t>::const_iterator it;

        for (it = oids.begin(); it != oids.end(); ++it)
        {
            oss << (*it) << ", ";
        }

        TRACER_WRITEDIRECT(oss.str());
    }

#endif

    ByteStream command, response;
    uint8_t err;
    uint32_t size = oids.size();

    command << MARK_ALL_PARTITION_FOR_DELETION << size;

    for ( unsigned i = 0; i < size; i++)
    {
        command << (ByteStream::quadbyte) oids[i];
    }

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Restore all extents for the specified OID(s) and partition number.
//------------------------------------------------------------------------------
int DBRM::restorePartition(const std::vector<OID_t>& oids,
                           const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITENOW("restorePartition");
        std::ostringstream oss;
        oss << "partitionNum: "
            std::set<LogicalPartition>::const_iterator partIt;

        for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
            oss << (*it) << " "
                oss << "; OIDS: ";

        std::vector<OID_t>::const_iterator it;

        for (it = oids.begin(); it != oids.end(); ++it)
        {
            oss << (*it) << ", ";
        }

        TRACER_WRITEDIRECT(oss.str());
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << RESTORE_PARTITION;
    serializeSet<LogicalPartition>(command, partitionNums);
    uint32_t oidSize = oids.size();
    command << oidSize;

    for ( unsigned i = 0; i < oidSize; i++)
        command << (ByteStream::quadbyte) oids[i];

    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;

    if (err)
        response >> emsg;

    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Return all the out-of-service partitions for the specified OID.
//------------------------------------------------------------------------------
int DBRM::getOutOfServicePartitions(OID_t oid,
                                    std::set<LogicalPartition>& partitionNums) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getOutOfServicePartitions");
        TRACER_ADDINPUT(oid);
        TRACER_WRITE;
    }

#endif

    try
    {
        em->getOutOfServicePartitions(oid, partitionNums);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    return ERR_OK;
}

//------------------------------------------------------------------------------
// Delete all extents for the specified DBRoot
//------------------------------------------------------------------------------
int DBRM::deleteDBRoot(uint16_t dbroot) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITENOW("deleteDBRoot");
        std::ostringstream oss;
        oss << "DBRoot: " << dbroot;
        TRACER_WRITEDIRECT(oss.str());
    }

#endif

    ByteStream command, response;
    uint8_t err;
    command << DELETE_DBROOT;
    uint32_t q = static_cast<uint32_t>(dbroot);
    command << q;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

//------------------------------------------------------------------------------
// Does the specified DBRoot have any extents.
// Returns an error if extentmap shared memory is not loaded.
//------------------------------------------------------------------------------
int DBRM::isDBRootEmpty(uint16_t dbroot,
                        bool& isEmpty, std::string& errMsg) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("isDBRootEmpty");
        TRACER_ADDINPUT(dbroot);
        TRACER_WRITE;
    }

#endif

    errMsg.clear();

    try
    {
        isEmpty = em->isDBRootEmpty(dbroot);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        errMsg = e.what();
        return ERR_FAILURE;
    }

    return ERR_OK;
}

int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID,
                       uint32_t vbFBO) DBRM_THROW
{

#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("writeVBEntry");
        TRACER_ADDINPUT(transID);
        TRACER_ADDINPUT(lbid);
        TRACER_ADDINPUT(vbOID);
        TRACER_ADDINPUT(vbFBO);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << WRITE_VB_ENTRY << (uint32_t) transID << (uint64_t) lbid << (uint32_t) vbOID << vbFBO;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

struct _entry
{
    _entry(LBID_t l) : lbid(l) { };
    LBID_t lbid;
    inline bool operator<(const _entry& e) const
    {
        return ((e.lbid >> 10) < (lbid >> 10));
    }
};

int DBRM::getDBRootsForRollback(VER_t transID, vector<uint16_t>* dbroots) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getDBRootsForRollback");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif
    bool locked[2] = {false, false};
    set<OID_t> vbOIDs;
    set<OID_t>::iterator vbIt;
    vector<LBID_t> lbidList;
    uint32_t i, size;
    uint32_t tmp32;
    OID_t vbOID;
    int err;

    set<_entry> lbidPruner;
    set<_entry>::iterator it;

    try
    {
        vbbm->lock(VBBM::READ);
        locked[0] = true;
        vss->lock(VSS::READ);
        locked[1] = true;

        vss->getUncommittedLBIDs(transID, lbidList);

        // prune the list; will leave at most 1 entry per 1024-lbid range
        for (i = 0, size = lbidList.size(); i < size; i++)
            lbidPruner.insert(_entry(lbidList[i]));

        // get the VB oids
        for (it = lbidPruner.begin(); it != lbidPruner.end(); ++it)
        {
            err = vbbm->lookup(it->lbid, transID, vbOID, tmp32);

            if (err)   // this error will be caught by DML; more appropriate to handle it there
                continue;

            vbOIDs.insert(vbOID);
        }

        // get the dbroots
        for (vbIt = vbOIDs.begin(); vbIt != vbOIDs.end(); ++vbIt)
        {
            err = getDBRootOfVBOID(*vbIt);

            if (err)
            {
                ostringstream os;
                os << "DBRM::getDBRootOfVBOID() returned an error looking for vbOID " << *vbIt;
                log(os.str());
                return ERR_FAILURE;
            }

            dbroots->push_back((uint16_t) err);
        }

        vss->release(VSS::READ);
        locked[1] = false;
        vbbm->release(VBBM::READ);
        locked[0] = false;

        return ERR_OK;
    }
    catch (exception& e)
    {
        if (locked[0])
            vbbm->release(VBBM::READ);

        if (locked[1])
            vss->release(VSS::READ);

        return -1;
    }

}

int DBRM::getUncommittedLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getUncommittedLBIDs");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif
    bool locked = false;

    try
    {
        vss->lock(VSS::READ);
        locked = true;

        vss->getUncommittedLBIDs(transID, lbidList);

        vss->release(VSS::READ);
        locked = false;
        return 0;
    }
    catch (exception& e)
    {
        if (locked)
            vss->release(VSS::READ);

        return -1;
    }
}

// @bug 1509.  New function that returns one LBID per extent touched as part of the transaction.  Used to get a list
// of blocks to use for updating casual partitioning when the transaction is committed.
int DBRM::getUncommittedExtentLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getUncommittedExtentLBIDs");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif
    bool locked = false;
    vector<LBID_t>::iterator lbidIt;
    typedef pair<int64_t, int64_t> range_t;
    range_t range;
    vector<range_t> ranges;
    vector<range_t>::iterator rangeIt;

    try
    {
        vss->lock(VSS::READ);
        locked = true;

        // Get a full list of uncommitted LBIDs related to this transactin.
        vss->getUncommittedLBIDs(transID, lbidList);

        vss->release(VSS::READ);
        locked = false;

        if (lbidList.size() > 0)
        {

            // Sort the vector.
            std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());

            // Get the LBID range for the first block in the list.
            lbidIt = lbidList.begin();

            if (em->lookup(*lbidIt, range.first, range.second) < 0)
            {
                return -1;
            }

            ranges.push_back(range);

            // Loop through the LBIDs and add the new ranges.
            ++lbidIt;

            while (lbidIt != lbidList.end())
            {
                if (*lbidIt > range.second)
                {
                    if (em->lookup(*lbidIt, range.first, range.second) < 0)
                    {
                        return -1;
                    }

                    ranges.push_back(range);
                }

                ++lbidIt;
            }

            // Reset the lbidList and return only the first LBID in each extent that was changed
            // in the transaction.
            lbidList.clear();

            for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
            {
                lbidList.push_back(rangeIt->first);
            }
        }

        return 0;
    }
    catch (exception& e)
    {
        if (locked)
            vss->release(VSS::READ);

        return -1;
    }
}


int DBRM::beginVBCopy(VER_t transID, uint16_t dbRoot, const LBIDRange_v& ranges,
                      VBRange_v& freeList) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("beginVBCopy");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << BEGIN_VB_COPY << (ByteStream::quadbyte) transID << dbRoot;
    serializeVector<LBIDRange>(command, ranges);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    try
    {
        response >> err;

        if (err != 0)
            return err;

        deserializeVector(response, freeList);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return ERR_NETWORK;
    }

    CHECK_EMPTY(response);
    return 0;
}

int DBRM::endVBCopy(VER_t transID, const LBIDRange_v& ranges)
DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("endVBCopy");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << END_VB_COPY << (ByteStream::quadbyte) transID;
    serializeVector(command, ranges);
    err = send_recv(command, response);

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::vbCommit(VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("vbCommit");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << VB_COMMIT << (ByteStream::quadbyte) transID;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::vbRollback(VER_t transID, const LBIDRange_v& lbidList) DBRM_THROW
{

#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("vbRollback ");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << VB_ROLLBACK1 << (ByteStream::quadbyte) transID;
    serializeVector(command, lbidList);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::vbRollback(VER_t transID, const vector<LBID_t>& lbidList) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("vbRollback");
        TRACER_ADDINPUT(transID);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << VB_ROLLBACK2 << (ByteStream::quadbyte) transID;
    serializeVector(command, lbidList);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::halt() DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("halt");

#endif
    ByteStream command, response;
    uint8_t err;

    command << HALT;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::resume() DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("resume");

#endif
    ByteStream command, response;
    uint8_t err;

    command << RESUME;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::forceReload() DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("forceReload");

#endif
    ByteStream command, response;
    uint8_t err;

    command << RELOAD;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::setReadOnly(bool b) DBRM_THROW
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("setReadOnly");
        TRACER_ADDBOOLINPUT(b);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << (b ? SETREADONLY : SETREADWRITE);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::isReadWrite() throw()
{
#ifdef BRM_INFO

    if (fDebug)  TRACER_WRITENOW("isReadWrite");

#endif
    ByteStream command, response;
    uint8_t err;

    command << GETREADONLY;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    //CHECK_EMPTY(response);
    return (err == 0 ? ERR_OK : ERR_READONLY);
}

int DBRM::dmlLockLBIDRanges(const vector<LBIDRange>& ranges, int txnID)
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("clear");

#endif
    ByteStream command, response;
    uint8_t err;

    command << LOCK_LBID_RANGES;
    serializeVector<LBIDRange>(command, ranges);
    command << (uint32_t) txnID;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::dmlReleaseLBIDRanges(const vector<LBIDRange>& ranges)
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("clear");

#endif
    ByteStream command, response;
    uint8_t err;

    command << RELEASE_LBID_RANGES;
    serializeVector<LBIDRange>(command, ranges);
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::clear() DBRM_THROW
{
    ByteStream command, response;
    uint8_t err;

    command << BRM_CLEAR;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() != 1)
        return ERR_NETWORK;

    response >> err;
    CHECK_EMPTY(response);
    return err;
}

int DBRM::checkConsistency() throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("checkConsistency");

#endif
    bool locked[2] = {false, false};

    try
    {
        em->checkConsistency();
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        return -1;
    }

    try
    {
        vbbm->lock(VBBM::READ);
        locked[0] = true;
        vss->lock(VSS::READ);
        locked[1] = true;
        vss->checkConsistency(*vbbm, *em);
        vss->release(VSS::READ);
        locked[1] = false;
        vbbm->release(VBBM::READ);
        locked[0] = false;
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;

        if (locked[1])
            vss->release(VSS::READ);

        if (locked[0])
            vbbm->release(VBBM::READ);

        return -1;
    }

    try
    {
        vbbm->lock(VBBM::READ);
        vbbm->checkConsistency();
        vbbm->release(VBBM::READ);
    }
    catch (exception& e)
    {
        cerr << e.what() << endl;
        vbbm->release(VBBM::READ);
        return -1;
    }

    return 0;
}

int DBRM::getCurrentTxnIDs(set<VER_t>& txnList) throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("getCurrentTxnIDs");

#endif
    bool locked[2] = { false, false };

    try
    {
        txnList.clear();
        vss->lock(VSS::READ);
        locked[0] = true;
        copylocks->lock(CopyLocks::READ);
        locked[1] = true;
        copylocks->getCurrentTxnIDs(txnList);
        vss->getCurrentTxnIDs(txnList);
        copylocks->release(CopyLocks::READ);
        locked[1] = false;
        vss->release(VSS::READ);
        locked[0] = false;
    }
    catch (exception& e)
    {
        if (locked[1])
            copylocks->release(CopyLocks::READ);

        if (locked[0])
            vss->release(VSS::READ);

        cerr << e.what() << endl;
        return -1;
    }

    return 0;
}

const QueryContext DBRM::verID()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("verID");

#endif
    ByteStream command, response;
    uint8_t err;
    QueryContext ret;

    command << VER_ID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: SessionManager::verID(): network error" << endl;
        ret.currentScn = -1;
        return ret;
    }

    try
    {
        response >> err;
        response >> ret;
        CHECK_EMPTY(response);
    }
    catch (exception& e)
    {
        cerr << "DBRM: SessionManager::verID(): bad response" << endl;
        log("DBRM: SessionManager::verID(): bad response", logging::LOG_TYPE_WARNING);
        ret.currentScn = -1;
    }

    return ret;
}

const QueryContext DBRM::sysCatVerID()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("sysCatVerID");

#endif
    ByteStream command, response;
    uint8_t err;
    QueryContext ret;

    command << SYSCAT_VER_ID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: SessionManager::sysCatVerID(): network error" << endl;
        ret.currentScn = -1;
        return ret;
    }

    try
    {
        response >> err;
        response >> ret;
        CHECK_EMPTY(response);
    }
    catch (exception& e)
    {
        cerr << "DBRM: SessionManager::sysCatVerID(): bad response" << endl;
        log("DBRM: SessionManager::sysCatVerID(): bad response", logging::LOG_TYPE_WARNING);
        ret.currentScn = -1;
    }

    return ret;
}


const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block,
                           bool isDDL)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("newTxnID");
        TRACER_ADDINPUT(session);
        TRACER_ADDBOOLINPUT(block);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err, tmp;
    uint32_t tmp32;
    TxnID ret;

    command << NEW_TXN_ID << session << (uint8_t) block << (uint8_t)isDDL;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: SessionManager::newTxnID(): network error");
        ret.valid = false;
        return ret;
    }

    if (response.length() != 6)
    {
        log("DBRM: SessionManager::newTxnID(): bad response");
        ret.valid = false;
        return ret;
    }

    response >> err;
    response >> tmp32;
    ret.id = tmp32;
    response >> tmp;
    ret.valid = (tmp == 0 ? false : true);
    CHECK_EMPTY(response);
    return ret;
}

void DBRM::committed(TxnID& txnid)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("committed");
        TRACER_ADDINPUT(txnid);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err;

    command << COMMITTED << (uint32_t) txnid.id << (uint8_t) txnid.valid;
    err = send_recv(command, response);
    txnid.valid = false;

    if (err != ERR_OK)
        log("DBRM: error: SessionManager::committed() failed");
    else if (response.length() != 1)
        log("DBRM: error: SessionManager::committed() failed (bad response)",
            logging::LOG_TYPE_ERROR);

    response >> err;

    if (err != ERR_OK)
        log("DBRM: error: SessionManager::committed() failed (valid error code)",
            logging::LOG_TYPE_ERROR);

}


void DBRM::rolledback(TxnID& txnid)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("rolledback");
        TRACER_ADDINPUT(txnid);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err, tmp;

    command << ROLLED_BACK << (uint32_t) txnid.id << (uint8_t) txnid.valid;
    err = send_recv(command, response);
    txnid.valid = false;

    if (err != ERR_OK)
        log("DBRM: error: SessionManager::rolledback() failed (network)");
    else if (response.length() != 1)
        log("DBRM: error: SessionManager::rolledback() failed (bad response)",
            logging::LOG_TYPE_ERROR);

    response >> tmp;

    if (tmp != ERR_OK)
    {
        if (getSystemReady() != 0 )
            log("DBRM: error: SessionManager::rolledback() failed (valid error code)",
                logging::LOG_TYPE_ERROR);
    }
}

int DBRM::getUnlockedLBIDs(BlockList_t* list) DBRM_THROW
{
    bool locked = false;

    list->clear();

    try
    {
        vss->lock(VSS::READ);
        locked = true;
        vss->getUnlockedLBIDs(*list);
        vss->release(VSS::READ);
        locked = false;
        return 0;
    }
    catch (exception& e)
    {
        if (locked)
            vss->release(VSS::READ);

        cerr << e.what() << endl;
        return -1;
    }
}


const TxnID DBRM::getTxnID
(const SessionManagerServer::SID session)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("getTxnID");
        TRACER_ADDINPUT(session);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err, tmp8;
    uint32_t tmp32;
    TxnID ret;

    command << GET_TXN_ID << (uint32_t) session;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: error: SessionManager::getTxnID() failed (network)", logging::LOG_TYPE_ERROR);
        ret.valid = false;
        return ret;
    }

    response >> err;

    if (err != ERR_OK)
    {
        log("DBRM: error: SessionManager::getTxnID() failed (got an error)",
            logging::LOG_TYPE_ERROR);
        ret.valid = false;
        return ret;
    }

    response >> tmp32 >> tmp8;
    ret.id = tmp32;
    ret.valid = tmp8;
    return ret;
}

boost::shared_array<SIDTIDEntry> DBRM::SIDTIDMap(int& len)
{
#ifdef BRM_INFO

    if (fDebug)
    {
        TRACER_WRITELATER("SIDTIDMap");
        TRACER_ADDOUTPUT(len);
        TRACER_WRITE;
    }

#endif

    ByteStream command, response;
    uint8_t err, tmp8;
    uint32_t tmp32;
    int i;
    boost::shared_array<SIDTIDEntry> ret;

    command << SID_TID_MAP;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: error: SessionManager::SIDTIDEntry() failed (network)");
        return ret;
    }

    response >> err;

    if (err != ERR_OK)
    {
        log("DBRM: error: SessionManager::SIDTIDEntry() failed (valid error code)",
            logging::LOG_TYPE_ERROR);
        return ret;
    }

    response >> tmp32;
    len = (int) tmp32;
    ret.reset(new SIDTIDEntry[len]);

    for (i = 0; i < len; i++)
    {
        response >> tmp32 >> tmp8;
        ret[i].txnid.id = tmp32;
        ret[i].txnid.valid = (tmp8 == 0 ? false : true);
        response >> tmp32;
        ret[i].sessionid = tmp32;
    }

    CHECK_EMPTY(response);
    return ret;
}

const uint32_t DBRM::getUnique32()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("getUnique32");

#endif

    ByteStream command, response;
    uint8_t err;
    uint32_t ret;

    command << GET_UNIQUE_UINT32;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: getUnique32() failed (network)\n";
        log("DBRM: getUnique32() failed (network)", logging::LOG_TYPE_ERROR);
        throw runtime_error("DBRM: getUnique32() failed check the controllernode");
        return 0;
    }

    /* Some jobsteps don't need the connection after this so close it to free up
    resources on the controller node */
    /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
    remove the client. Plus, it may cause weird network issue when the socket is being
    released and re-established very quickly*/
    //pthread_mutex_lock(&mutex);
    //delete msgClient;
    //msgClient = NULL;
    //pthread_mutex_unlock(&mutex);

    response >> err;

    if (err != ERR_OK)
    {
        cerr << "DBRM: getUnique32() failed (got an error)\n";
        log("DBRM: getUnique32() failed (got an error)",
            logging::LOG_TYPE_ERROR);
        throw runtime_error("DBRM: getUnique32() failed check the controllernode");
        return 0;
    }

    response >> ret;
// 	cerr << "DBRM returning " << ret << endl;
    return ret;
}

const uint64_t DBRM::getUnique64()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("getUnique64");

#endif

    ByteStream command, response;
    uint8_t err;
    uint64_t ret;

    command << GET_UNIQUE_UINT64;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: getUnique64() failed (network)\n";
        log("DBRM: getUnique64() failed (network)", logging::LOG_TYPE_ERROR);
        throw runtime_error("DBRM: getUnique64() failed check the controllernode");
        return 0;
    }

    /* Some jobsteps don't need the connection after this so close it to free up
    resources on the controller node */
    /* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
    remove the client. Plus, it may cause weird network issue when the socket is being
    released and re-established very quickly*/
    //pthread_mutex_lock(&mutex);
    //delete msgClient;
    //msgClient = NULL;
    //pthread_mutex_unlock(&mutex);

    response >> err;

    if (err != ERR_OK)
    {
        cerr << "DBRM: getUnique64() failed (got an error)\n";
        log("DBRM: getUnique64() failed (got an error)",
            logging::LOG_TYPE_ERROR);
        throw runtime_error("DBRM: getUnique64() failed check the controllernode");
        return 0;
    }

    response >> ret;
// 	cerr << "DBRM returning " << ret << endl;
    return ret;
}

void DBRM::sessionmanager_reset()
{
    ByteStream command, response;
    command << SM_RESET;
    send_recv(command, response);
}

bool DBRM::isEMEmpty() throw()
{
    bool res = false;

    try
    {
        res = em->empty();
    }
    catch (...)
    {
        res = false;
    }

    return res;
}

vector<InlineLBIDRange> DBRM::getEMFreeListEntries() throw()
{
    vector<InlineLBIDRange> res;

    try
    {
        res = em->getFreeListEntries();
    }
    catch (...)
    {
        res.clear();
    }

    return res;
}

int DBRM::takeSnapshot() throw ()
{
    ByteStream command, response;
    uint8_t  err;

    command << TAKE_SNAPSHOT;
    err = send_recv(command, response);

    if (err != ERR_OK)
        return err;

    if (response.length() == 0)
        return ERR_NETWORK;

    return 0;
}

int DBRM::getSystemReady() throw()
{
    uint32_t stateFlags;

    if (getSystemState(stateFlags) < 0)
    {
        return -1;
    }

    return (stateFlags & SessionManagerServer::SS_READY);
}

int DBRM::getSystemQueryReady() throw()
{
    uint32_t stateFlags;

    if (getSystemState(stateFlags) < 0)
    {
        return -1;
    }

    return (stateFlags & SessionManagerServer::SS_QUERY_READY);
}

int DBRM::getSystemSuspended() throw()
{
    uint32_t stateFlags;

    if (getSystemState(stateFlags) < 0)
    {
        return -1;
    }

    return (stateFlags & SessionManagerServer::SS_SUSPENDED);
}

int DBRM::getSystemSuspendPending(bool& bRollback) throw()
{
    uint32_t stateFlags;

    if (getSystemState(stateFlags) < 0)
    {
        return -1;
    }

    bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;

    return (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING);
}

int DBRM::getSystemShutdownPending(bool& bRollback, bool& bForce) throw()
{
    uint32_t stateFlags;

    if (getSystemState(stateFlags) < 0)
    {
        return -1;
    }

    bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
    bForce = stateFlags & SessionManagerServer::SS_FORCE;

    return (stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING);
}

int DBRM::setSystemReady(bool bReady) throw()
{
    if (bReady)
    {
        return setSystemState(SessionManagerServer::SS_READY);
    }
    else
    {
        return clearSystemState(SessionManagerServer::SS_READY);
    }
}

int DBRM::setSystemQueryReady(bool bReady) throw()
{
    if (bReady)
    {
        return setSystemState(SessionManagerServer::SS_QUERY_READY);
    }
    else
    {
        return clearSystemState(SessionManagerServer::SS_QUERY_READY);
    }
}

int DBRM::setSystemSuspended(bool bSuspended) throw()
{
    uint32_t stateFlags = 0;

    if (bSuspended)
    {
        if (setSystemState(SessionManagerServer::SS_SUSPENDED) < 0)
        {
            return -1;
        }
    }
    else
    {
        stateFlags = SessionManagerServer::SS_SUSPENDED;
    }

    // In either case, we need to clear the pending and rollback flags
    stateFlags |= SessionManagerServer::SS_SUSPEND_PENDING;
    stateFlags |= SessionManagerServer::SS_ROLLBACK;
    return clearSystemState(stateFlags);
}

int DBRM::setSystemSuspendPending(bool bPending, bool bRollback) throw()
{
    uint32_t stateFlags = SessionManagerServer::SS_SUSPEND_PENDING;

    if (bPending)
    {
        if (bRollback)
        {
            stateFlags |= SessionManagerServer::SS_ROLLBACK;
        }

        return setSystemState(stateFlags);
    }
    else
    {
        stateFlags |= SessionManagerServer::SS_ROLLBACK;
        return clearSystemState(stateFlags);
    }
}

int DBRM::setSystemShutdownPending(bool bPending, bool bRollback, bool bForce) throw()
{
    int rtn = 0;
    uint32_t stateFlags = SessionManagerServer::SS_SHUTDOWN_PENDING;

    if (bPending)
    {
        if (bForce)
        {
            stateFlags |= SessionManagerServer::SS_FORCE;
        }
        else if (bRollback)
        {
            stateFlags |= SessionManagerServer::SS_ROLLBACK;
        }

        rtn = setSystemState(stateFlags);
    }
    else
    {
        stateFlags |= SessionManagerServer::SS_ROLLBACK;
        stateFlags |= SessionManagerServer::SS_FORCE;
        rtn = clearSystemState(stateFlags);		// Clears the flags that are turned on in stateFlags
    }

    return rtn;
}

/* Return the shm stateflags
 */
int DBRM::getSystemState(uint32_t& stateFlags) throw()
{
    try
    {
#ifdef BRM_INFO

        if (fDebug)
        {
            TRACER_WRITELATER("getSystemState");
            TRACER_WRITE;
        }

#endif
        ByteStream command, response;
        uint8_t err;

        command << GET_SYSTEM_STATE;
        err = send_recv(command, response);

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::getSystemState() failed (network)";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            return -1;
        }

        response >> err;

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::getSystemState() failed (error " << err << ")";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            return -1;
        }

        response >> stateFlags;
        return 1;
    }
    catch (...)
    {
    }

    return -1;
}

/* Set the shm stateflags that are set in the parameter
 */
int DBRM::setSystemState(uint32_t stateFlags) throw()
{
    try
    {
#ifdef BRM_INFO

        if (fDebug)
        {
            TRACER_WRITELATER("setSystemState");
            TRACER_WRITE;
        }

#endif

        ByteStream command, response;
        uint8_t err;

        command << SET_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
        err = send_recv(command, response);

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::setSystemState() failed (network)";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            stateFlags = 0;
            return -1;
        }

        response >> err;

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::setSystemState() failed (got an error)";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            stateFlags = 0;
            return -1;
        }

        return 1;
    }
    catch (...)
    {
    }

    stateFlags = 0;
    return -1;
}

/* Clear the shm stateflags that are set in the parameter
 */
int DBRM::clearSystemState(uint32_t stateFlags) throw()
{
    try
    {
#ifdef BRM_INFO

        if (fDebug)
        {
            TRACER_WRITELATER("clearSystemState");
            TRACER_WRITE;
        }

#endif

        ByteStream command, response;
        uint8_t err;

        command << CLEAR_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
        err = send_recv(command, response);

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::clearSystemState() failed (network)";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            return -1;
        }

        response >> err;

        if (err != ERR_OK)
        {
            std::ostringstream oss;
            oss << "DBRM: error: SessionManager::clearSystemState() failed (got an error)";
            log(oss.str(), logging::LOG_TYPE_ERROR);
            return -1;
        }

        return 1;
    }
    catch (...)
    {
    }

    return -1;
}

/* Ping the controller node. Don't print anything.
 */
bool DBRM::isDBRMReady() throw()
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("isDBRMReady");

#endif
    boost::mutex::scoped_lock scoped(mutex);

    try
    {
        for (int attempt = 0; attempt < 2; ++attempt)
        {
            try
            {
                if (msgClient == NULL)
                {
                    msgClient = MessageQueueClientPool::getInstance(masterName);
                }

                if (msgClient->connect())
                {
                    return true;
                }
            }
            catch (...)
            {
            }

            MessageQueueClientPool::releaseInstance(msgClient);
            msgClient = NULL;
            sleep(1);
        }
    }
    catch (...)
    {
    }

    return false;
}

/* This waits for the lock up to 30 sec.  After 30 sec, the assumption is something
 * bad happened, and this will fix the lock state so that primproc can keep
 * running.  These prevent a non-critical problem anyway.
 */
void DBRM::lockLBIDRange(LBID_t start, uint32_t count)
{
    bool locked = false, lockedRange = false;
    LBIDRange range;
    const uint32_t waitInterval = 50000;  // in usec
    const uint32_t maxRetries = 30000000 / waitInterval; // 30 secs
    uint32_t retries = 0;

    range.start = start;
    range.size = count;

    try
    {
        copylocks->lock(CopyLocks::WRITE);
        locked = true;

        while (copylocks->isLocked(range) && retries < maxRetries)
        {
            copylocks->release(CopyLocks::WRITE);
            locked = false;
            usleep(waitInterval);
            retries++;
            copylocks->lock(CopyLocks::WRITE);
            locked = true;
        }

        if (retries >= maxRetries)
            copylocks->forceRelease(range);

        copylocks->lockRange(range, -1);
        lockedRange = true;
        copylocks->confirmChanges();
        copylocks->release(CopyLocks::WRITE);
        locked = false;
    }
    catch (...)
    {
        if (lockedRange)
            copylocks->releaseRange(range);

        if (locked)
        {
            copylocks->confirmChanges();
            copylocks->release(CopyLocks::WRITE);
        }

        throw;
    }
}

void DBRM::releaseLBIDRange(LBID_t start, uint32_t count)
{
    bool locked = false;
    LBIDRange range;
    range.start = start;
    range.size = count;

    try
    {
        copylocks->lock(CopyLocks::WRITE);
        locked = true;
        copylocks->releaseRange(range);
        copylocks->confirmChanges();
        copylocks->release(CopyLocks::WRITE);
        locked = false;
    }
    catch (...)
    {
        if (locked)
        {
            copylocks->confirmChanges();
            copylocks->release(CopyLocks::WRITE);
        }

        throw;
    }
}

/* OID Manager section */

int DBRM::allocOIDs(int num)
{
#ifdef BRM_INFO

    if (fDebug) TRACER_WRITENOW("allocOID");

#endif
    ByteStream command, response;
    uint8_t err;
    uint32_t ret;

    command << ALLOC_OIDS;
    command << (uint32_t) num;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: OIDManager::allocOIDs(): network error" << endl;
        log("DBRM: OIDManager::allocOIDs(): network error", logging::LOG_TYPE_CRITICAL);
        return -1;
    }

    try
    {
        response >> err;

        if (err != ERR_OK)
            return -1;

        response >> ret;
        CHECK_EMPTY(response);
        return (int) ret;
    }
    catch (...)
    {
        log("DBRM: OIDManager::allocOIDs(): bad response", logging::LOG_TYPE_CRITICAL);
        return -1;
    }
}

void DBRM::returnOIDs(int start, int end)
{
    ByteStream command, response;
    uint8_t err;

    command << RETURN_OIDS;
    command << (uint32_t) start;
    command << (uint32_t) end;
    err = send_recv(command, response);

    if (err == ERR_NETWORK)
    {
        cerr << "DBRM: OIDManager::returnOIDs(): network error" << endl;
        log("DBRM: OIDManager::returnOIDs(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::returnOIDs(): network error");
    }

    try
    {
        response >> err;
        CHECK_EMPTY(response);
    }
    catch (...)
    {
        err = ERR_FAILURE;
    }

    if (err != ERR_OK)
    {
        log("DBRM: OIDManager::returnOIDs() failed", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::returnOIDs() failed");
    }
}

int DBRM::oidm_size()
{
    ByteStream command, response;
    uint8_t err;
    uint32_t ret;

    command << OIDM_SIZE;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: OIDManager::size(): network error" << endl;
        log("DBRM: OIDManager::size(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::size(): network error");
    }

    try
    {
        response >> err;

        if (err == ERR_OK)
        {
            response >> ret;
            CHECK_EMPTY(response);
            return ret;
        }

        CHECK_EMPTY(response);
        return -1;
    }
    catch (...)
    {
        log("DBRM: OIDManager::size(): bad response", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::size(): bad response");
    }
}

int DBRM::allocVBOID(uint32_t dbroot)
{
    ByteStream command, response;
    uint8_t err;
    uint32_t ret;

    command << ALLOC_VBOID << (uint32_t) dbroot;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: OIDManager::allocVBOID(): network error" << endl;
        log("DBRM: OIDManager::allocVBOID(): network error", logging::LOG_TYPE_CRITICAL);
        return -1;
    }

    try
    {
        response >> err;

        if (err == ERR_OK)
        {
            response >> ret;
            CHECK_EMPTY(response);
            return ret;
        }

        CHECK_EMPTY(response);
        return -1;
    }
    catch (...)
    {
        log("DBRM: OIDManager::allocVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
        return -1;
    }
}

int DBRM::getDBRootOfVBOID(uint32_t vbOID)
{
    ByteStream command, response;
    uint8_t err;
    uint32_t ret;

    command << GETDBROOTOFVBOID << (uint32_t) vbOID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        cerr << "DBRM: OIDManager::getDBRootOfVBOID(): network error" << endl;
        log("DBRM: OIDManager::getDBRootOfVBOID(): network error", logging::LOG_TYPE_CRITICAL);
        return -1;
    }

    try
    {
        response >> err;

        if (err == ERR_OK)
        {
            response >> ret;
            CHECK_EMPTY(response);
            return (int) ret;
        }

        CHECK_EMPTY(response);
        return -1;
    }
    catch (...)
    {
        log("DBRM: OIDManager::getDBRootOfVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
        return -1;
    }
}

vector<uint16_t> DBRM::getVBOIDToDBRootMap()
{
    ByteStream command, response;
    uint8_t err;
    vector<uint16_t> ret;

    command << GETVBOIDTODBROOTMAP;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: OIDManager::getVBOIDToDBRootMap(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): network error");
    }

    try
    {
        response >> err;

        if (err != ERR_OK)
        {
            log("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error", logging::LOG_TYPE_CRITICAL);
            throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error");
        }

        deserializeInlineVector<uint16_t>(response, ret);
        CHECK_EMPTY(response);
        return ret;
    }
    catch (...)
    {
        log("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response");
    }
}

uint64_t DBRM::getTableLock(const vector<uint32_t>& pmList, uint32_t tableOID,
                            string* ownerName, uint32_t* ownerPID, int32_t* ownerSessionID, int32_t* ownerTxnID, LockState state)
{
    ByteStream command, response;
    uint8_t err;
    uint64_t ret;
    TableLockInfo tli;
    uint32_t tmp32;
    vector<uint32_t> dbRootsList;
    OamCache* oamcache = OamCache::makeOamCache();
    OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap();
    int moduleId = 0;

    for (uint32_t i = 0; i < pmList.size(); i++)
    {
        moduleId = pmList[i];
        vector<int> dbroots = (*pmDbroots)[moduleId];

        for (uint32_t j = 0; j < dbroots.size(); j++)
            dbRootsList.push_back((uint32_t)dbroots[j]);
    }

    tli.id = 0;
    tli.ownerName = *ownerName;
    tli.ownerPID = *ownerPID;
    tli.ownerSessionID = *ownerSessionID;
    tli.ownerTxnID = *ownerTxnID;
    tli.dbrootList = dbRootsList;
    tli.state = state;
    tli.tableOID = tableOID;
    tli.creationTime = time(NULL);

    command << GET_TABLE_LOCK << tli;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: getTableLock(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getTableLock(): network error");
    }

    response >> err;

    /* TODO: this means a save failure, need a specific exception type */
    if (err != ERR_OK)
        throw runtime_error("Table lock save file failure");

    response >> ret;

    if (ret == 0)
    {
        response >> *ownerPID;
        response >> *ownerName;
        response >> tmp32;
        *ownerSessionID = tmp32;
        response >> tmp32;
        *ownerTxnID = tmp32;
    }

    idbassert(response.length() == 0);
    return ret;
}

bool DBRM::releaseTableLock(uint64_t id)
{
    ByteStream command, response;
    uint8_t err;

    command << RELEASE_TABLE_LOCK << id;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: releaseTableLock(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: releaseTableLock(): network error");
    }

    response >> err;

    /* TODO: this means a save failure, need a specific exception type */
    if (err != ERR_OK)
        throw runtime_error("Table lock save file failure");

    response >> err;
    idbassert(response.length() == 0);

    return (bool) err;
}

bool DBRM::changeState(uint64_t id, LockState state)
{
    ByteStream command, response;
    uint8_t err;

    command << CHANGE_TABLE_LOCK_STATE << id << (uint32_t) state;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: changeState(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: changeState(): network error");
    }

    response >> err;

    /* TODO: this means a save failure, need a specific exception type */
    if (err != ERR_OK)
        throw runtime_error("Table lock save file failure");

    response >> err;
    idbassert(response.length() == 0);

    return (bool) err;
}

bool DBRM::changeOwner(uint64_t id, const string& ownerName, uint32_t ownerPID, int32_t ownerSessionID,
                       int32_t ownerTxnID)
{
    ByteStream command, response;
    uint8_t err;

    command << CHANGE_TABLE_LOCK_OWNER << id << ownerName << ownerPID <<
            (uint32_t) ownerSessionID << (uint32_t) ownerTxnID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: changeOwner(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: changeOwner(): network error");
    }

    response >> err;

    /* TODO: this means a save failure, need a specific exception type */
    if (err != ERR_OK)
        throw runtime_error("Table lock save file failure");

    response >> err;
    idbassert(response.length() == 0);
    return (bool) err;
}

bool DBRM::checkOwner(uint64_t id)
{
    ByteStream command, response;
    uint8_t err;

    command << OWNER_CHECK << id;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: ownerCheck(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: ownerCheck(): network error");
    }

    response >> err;

    /* TODO: this means a save failure, need a specific exception type */
    if (err != ERR_OK)
        throw runtime_error("Table lock save file failure");

    response >> err;
    idbassert(response.length() == 0);
    return (bool) err;  // Return true means the owner is valid
}

vector<TableLockInfo> DBRM::getAllTableLocks()
{
    ByteStream command, response;
    uint8_t err;
    vector<TableLockInfo> ret;

    command << GET_ALL_TABLE_LOCKS;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: getAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAllTableLocks(): network error");
    }

    response >> err;

    if (err != ERR_OK)
    {
        log("DBRM: getAllTableLocks(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAllTableLocks(): processing error");
    }

    deserializeVector<TableLockInfo>(response, ret);
    idbassert(response.length() == 0);
    return ret;
}

void DBRM::releaseAllTableLocks()
{
    ByteStream command, response;
    uint8_t err;

    command << RELEASE_ALL_TABLE_LOCKS;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: releaseAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: releaseAllTableLocks(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
        throw runtime_error("DBRM: releaseAllTableLocks(): processing error");
}

bool DBRM::getTableLockInfo(uint64_t id, TableLockInfo* tli)
{
    ByteStream command, response;
    uint8_t err;

    command << GET_TABLE_LOCK_INFO << id;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: getTableLockInfo(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getTableLockInfo(): network error");
    }

    response >> err;

    if (err != ERR_OK)
        throw runtime_error("DBRM: getTableLockInfo() processing error");

    response >> err;

    if (err)
        response >> *tli;

    return (bool) err;
}

void DBRM::startAISequence(uint32_t OID, uint64_t firstNum, uint32_t colWidth,
                           execplan::CalpontSystemCatalog::ColDataType colDataType)
{
    ByteStream command, response;
    uint8_t err;
    uint8_t tmp8 = colDataType;

    command << START_AI_SEQUENCE << OID << firstNum << colWidth << tmp8;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: startAISequence(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: startAISequence(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
    {
        log("DBRM: startAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: startAISequence(): processing error");
    }
}

bool DBRM::getAIRange(uint32_t OID, uint32_t count, uint64_t* firstNum)
{
    ByteStream command, response;
    uint8_t err;

    command << GET_AI_RANGE << OID << count;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: getAIRange(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAIRange(): network error");
    }

    response >> err;

    if (err != ERR_OK)
    {
        log("DBRM: getAIRange(): processing error",	logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAIRange(): processing error");
    }

    response >> err;

    if (err == 0)
        return false;

    response >> *firstNum;
    idbassert(response.length() == 0);
    return true;
}

bool DBRM::getAIValue(uint32_t OID, uint64_t* value)
{
    return getAIRange(OID, 0, value);
}

void DBRM::resetAISequence(uint32_t OID, uint64_t value)
{
    ByteStream command, response;
    uint8_t err;

    command << RESET_AI_SEQUENCE << OID << value;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: resetAISequence(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: resetAISequence(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
    {
        log("DBRM: resetAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: resetAISequence(): processing error");
    }
}

void DBRM::getAILock(uint32_t OID)
{
    ByteStream command, response;
    uint8_t err;

    command << GET_AI_LOCK << OID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: getAILock(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAILock(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
    {
        log("DBRM: getAILock(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: getAILock(): processing error");
    }
}

void DBRM::releaseAILock(uint32_t OID)
{
    ByteStream command, response;
    uint8_t err;

    command << RELEASE_AI_LOCK << OID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM: releaseAILock(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: releaseAILock(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
    {
        log("DBRM: releaseAILock(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: releaseAILock(): processing error");
    }
}

void DBRM::deleteAISequence(uint32_t OID)
{
    ByteStream command, response;
    uint8_t err;

    command << DELETE_AI_SEQUENCE << OID;
    err = send_recv(command, response);

    if (err != ERR_OK)
    {
        log("DBRM:deleteAILock(): network error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: deleteAILock(): network error");
    }

    response >> err;
    idbassert(response.length() == 0);

    if (err != ERR_OK)
    {
        log("DBRM: deleteAILock(): processing error", logging::LOG_TYPE_CRITICAL);
        throw runtime_error("DBRM: deleteAILock(): processing error");
    }
}

void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid, vector<LBID_t>* plbidList)
{
    // Here we want to minimize the number of calls to dbrm
    // Given that, and the fact that we need to know the column type
    // in order to set the invalid min and max correctly in the extents,
    // We do the following:
    // 1) Maintain a vector of all extents we've looked at.
    // 2) Get the list of uncommitted lbids for the transaction.
    // 3) Look in that list to see if we've already looked at this extent.
    // 4) If not,
    //    a) lookup the min and max lbid for the extent it belongs to
    //    b) lookup the column oid for that lbid
    //    c) add to the vector of extents
    // 5) Create a list of CPInfo structures with the first lbid and col type of each extent
    // 6) Lookup the column type for each retrieved oid.
    // 7) mark each extent invalid, just like we would during update. This sets the proper
    //    min and max (and set the state to CP_UPDATING.
    // 6) Call setExtentsMaxMin to set the state to CP_INVALID.

    vector<LBID_t> localLBIDList;

    boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
    CPInfoList_t cpInfos;
    CPInfo aInfo;
    int oid;
    uint16_t dbRoot;
    uint32_t partitionNum;
    uint16_t segmentNum;
    uint32_t fileBlockOffset;

    // 2) Get the list of uncommitted lbids for the transaction, if we weren't given one.
    if (plbidList == NULL)
    {
        getUncommittedExtentLBIDs(static_cast<VER_t>(txnid), localLBIDList);
        plbidList = &localLBIDList;
    }

    if (plbidList->size() == 0)
    {
        return; // Nothing to do.
    }

    vector<LBID_t>::const_iterator iter = plbidList->begin();
    vector<LBID_t>::const_iterator end = plbidList->end();
    csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();

    for (; iter != end; ++iter)
    {
        LBID_t lbid = *iter;
        aInfo.firstLbid = lbid;

        // lookup the column oid for that lbid (all we care about is oid here)
        if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0)
        {
            if (execplan::isUnsigned(csc->colType(oid).colDataType))
            {
                aInfo.max = 0;
                aInfo.min = numeric_limits<uint64_t>::max();
            }
            else
            {
                aInfo.max = numeric_limits<int64_t>::min();
                aInfo.min = numeric_limits<int64_t>::max();
            }
        }
        else
        {
            // We have a problem, but we need to put something in. This should never happen.
            aInfo.max = numeric_limits<int64_t>::min();
            aInfo.min = numeric_limits<int64_t>::max();
        }

        aInfo.seqNum = -2;
        cpInfos.push_back(aInfo);
    }

    // Call setExtentsMaxMin to invalidate and set the proper max/min in each extent
    setExtentsMaxMin(cpInfos);
}

}   //namespace
